package org.luosl.webmagicx.monitor

import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import akka.http.scaladsl.server.Route
import org.luosl.webmagicx.SpiderCell
import org.luosl.webmagicx.listeners.{GeneralHandlerListener, GeneralPipelineListener, GeneralProcessorListener, GeneralSpiderListener}
import org.luosl.webmagicx.monitor.action.{SpiderAction, SystemAction}
import org.luosl.webmagicx.processor.AbstractProcessor
import us.codecraft.webmagic.SpiderListener
import akka.http.scaladsl.server.Directives.{path, _}

import scala.collection.JavaConverters._

/**
  * 基于http的爬虫监听
  * @param host host
  * @param port port
  * @param exitWhenIdle 在空闲时终止服务
  * @param maxIdleTime 最大空闲时间
  */
class HttpSpiderMonitor(host: String, port: Int, exitWhenIdle: Boolean = false, maxIdleTime:Int = 10) {

  val monitorCellMap:ConcurrentHashMap[String,MonitorCell] = new ConcurrentHashMap[String,MonitorCell]()

  /** 爬虫任务管理 */
  private val taskMonitor:TaskMonitor = new TaskMonitor()

  /** http 服务器 */
  private val httpServer = AkkaHttpServer(host, port, routes, "webmagicx-httpMonitor-system")

  /** web action */
  private val systemAction:SystemAction = SystemAction(this)
  private val spiderAction:SpiderAction = SpiderAction(this)

  /**
    * 空闲检测
    */
  if(exitWhenIdle){
    val thread:Thread = new Thread{
      override def run(): Unit = {
          TimeUnit.SECONDS.sleep(maxIdleTime)
          while(!isIdle) TimeUnit.SECONDS.sleep(2)
          shutdown()
      }
    }
    thread.setName("HttpSpiderMonitor exitWhenComplete thread")
    thread.setDaemon(true)
    thread.start()
  }

  /**
    * 定义路由
    * @return
    */
  def routes:Seq[Route] = {
    Seq(
      path("")(systemAction.overview()),
      path("system" / "overview")(systemAction.overview()),
      path("system" / "shutdown")(systemAction.shutdown()),
      path("spider"/ "state" / Remaining)(id=> spiderAction.spiderState(id)),
      ( path("spider" / "submit") & entity(as[String]) )(data=> spiderAction.submit(data)),
      path("spider" / "kill" / Remaining)(id=> spiderAction.kill(id)),
      path("spider" / "conf" / Remaining)(id=> spiderAction.spiderConf(id))
    )
  }


  /**
    * 提交一个任务
    * @param spiderCell spiderCell
    */
  def submitSpiderTask(spiderCell: SpiderCell): String ={
    taskMonitor.submitSpiderTask(spiderCell)
    register(spiderCell)
    spiderCell.id
  }

  /**
    * 注册监听
    * @param spiderCell spiderCell
    */
  def register(spiderCell:SpiderCell): Unit ={

    // 添加 Spider 监听
    val spiderListener:GeneralSpiderListener = {
      val spiderListener:GeneralSpiderListener = new GeneralSpiderListener(spiderCell)
      val sls:List[SpiderListener] = spiderListener :: spiderCell.spider.getSpiderListeners.asScala.toList
      spiderCell.spider.setSpiderListeners(sls.asJava); spiderListener
    }
    // 添加 Processor 监听
    val processorListener:GeneralProcessorListener = {
      val processor: AbstractProcessor =  spiderCell.processor
      val pls = new GeneralProcessorListener(processor.getClass)
      processor.addListener(pls); pls
    }

    // 添加 handler 监听
    val handlerListener:Seq[GeneralHandlerListener] = spiderCell.handlers.map{ handler=>
        val hl:GeneralHandlerListener = new GeneralHandlerListener(handler.getClass)
        handler.addListener(hl); hl
      }

    // 添加 pipelines 监听
    val pipelineListeners:Seq[GeneralPipelineListener] = spiderCell.pipelines.map{ pipeline =>
      val pipelineListener = new GeneralPipelineListener(pipeline.getClass)
      pipeline.addListener(pipelineListener); pipelineListener
    }

    monitorCellMap.put(spiderCell.id,
      MonitorCell(spiderCell,spiderListener,processorListener,handlerListener,pipelineListeners))
  }

  /**
    * kill 一个爬虫
    * @param id id
    * @return
    */
  def kill(id:String):Boolean = {
    val cell:MonitorCell = monitorCellMap.get(id)
    cell.spiderCell.spider.kill()
    taskMonitor.remove(id)
    true
  }

  /**
    * monitor 服务是佛为空闲
    * @return
    */
  def isIdle: Boolean = taskMonitor.runTaskNumber() <= 0

  /**
    * 终止服务
    */
  def shutdown(): Unit = {
    httpServer.shutdown()
    taskMonitor.shutdown()
  }

  def start(): Unit ={
    httpServer.start()
    taskMonitor.start()
  }

}

/**
  * 封装监控信息
  * @param spiderCell spiderCell
  * @param spiderListener spiderListener
  * @param processorListener processorListener
  * @param handlerListeners handlerListeners
  * @param pipelineListeners pipelineListeners
  */
case class MonitorCell(
                        spiderCell:SpiderCell,
                        spiderListener:GeneralSpiderListener,
                        processorListener:GeneralProcessorListener,
                        handlerListeners:Seq[GeneralHandlerListener],
                        pipelineListeners:Seq[GeneralPipelineListener]
                      )
